ShuffleMapStage — Intermediate Shuffle Map Stage in Job

A ShuffleMapStage (aka shuffle map stage, or simply map stage) is an intermediate stage in the execution DAG that produces data for shuffle operation. It is an input for the other following stages in the DAG of stages. That is why it is also called a shuffle dependency’s map side.

Tip
Read about ShuffleDependency.

A ShuffleMapStage may contain multiple pipelined operations, e.g. map and filter, before shuffle operation.

Caution
FIXME: Show the example and the logs + figures

A ShuffleMapStage can be part of many jobs — refer to the section ShuffleMapStage sharing.

A ShuffleMapStage is a stage with a ShuffleDependency — the shuffle that it is part of and outputLocs and numAvailableOutputs track how many map outputs are ready.

Note
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage for Adaptive Query Planning / Adaptive Scheduling.

When executed, a ShuffleMapStage saves map output files that can later be fetched by reduce tasks. When all map outputs are available, the ShuffleMapStage is considered available (or ready).

Caution
FIXME Figure with ShuffleMapStages saving files

The output locations (outputLocs) of a ShuffleMapStage are the same as used by its ShuffleDependency. Output locations can be missing, i.e. partitions have not been cached or are lost.

A ShuffleMapStage is registered to DAGScheduler that tracks the mapping of shuffles (by their ids from SparkContext) to corresponding ShuffleMapStages that compute them, stored in shuffleToMapStage.

A ShuffleMapStage is created from an input ShuffleDependency and a job’s id (in DAGScheduler#newOrUsedShuffleStage).

Caution
FIXME Where’s shuffleToMapStage used?
  • getShuffleMapStage - see Stage sharing

  • getAncestorShuffleDependencies

When there is no ShuffleMapStage for a shuffle id (of a ShuffleDependency), one is created with the ancestor shuffle dependencies of the RDD (of a ShuffleDependency) that are registered to MapOutputTrackerMaster.

FIXME Where is ShuffleMapStage used?

  • newShuffleMapStage - the proper way to create shuffle map stages (with the additional setup steps)

  • getShuffleMapStage - see Stage sharing

Caution
  • newShuffleMapStage

Table 1. ShuffleMapStage Internal Registries and Counters
Name Description

outputLocs

Tracks MapStatuses for each partition.

There could be many MapStatus entries per partition due to Speculative Execution of Tasks.

When ShuffleMapStage is created, outputLocs is empty, i.e. all elements are empty lists.

The size of outputLocs is exactly the number of partitions of the RDD the stage runs on.

_numAvailableOutputs

The number of available outputs for the partitions.

_numAvailableOutputs increments when the first MapStatus is registered for a partition and decrements when the last MapStatus is removed for a partition.

_numAvailableOutputs should not be greater than the number of partitions (and hence the number of MapStatus collections in outputLocs internal registry).

removeOutputsOnExecutor Method

Caution
FIXME

outputLocInMapOutputTrackerFormat Method

Caution
FIXME

addActiveJob Method

Caution
FIXME

Creating ShuffleMapStage Instance

Caution
FIXME

mapStageJobs Method

Caution
FIXME

shuffleDep Property

Caution
FIXME

removeActiveJob Method

Caution
FIXME

Registering MapStatus For Partition — addOutputLoc Method

addOutputLoc(partition: Int, status: MapStatus): Unit

addOutputLoc adds the input status to the output locations for the input partition.

addOutputLoc increments _numAvailableOutputs internal counter if the input MapStatus is the first result for the partition.

Removing MapStatus For Partition And BlockManager — removeOutputLoc Method

removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit

removeOutputLoc removes the MapStatus for the input partition and bmAddress BlockManager from the output locations.

removeOutputLoc decrements _numAvailableOutputs internal counter if the the removed MapStatus was the last result for the partition.

Note
removeOutputLoc is exclusively used when a Task has failed with FetchFailed exception.

Finding Missing Partitions — findMissingPartitions Method

findMissingPartitions(): Seq[Int]
Note
findMissingPartitions is a part of Stage contract that returns the partitions that are missing, i.e. are yet to be computed.

Internally, findMissingPartitions uses outputLocs internal registry to find indices with empty lists of MapStatus.

ShuffleMapStage Sharing

A ShuffleMapStage can be shared across multiple jobs, if these jobs reuse the same RDDs.

When a ShuffleMapStage is submitted to DAGScheduler to execute, getShuffleMapStage is called.

scala> val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey()  (1)

scala> rdd.count  (2)

scala> rdd.count  (3)
  1. Shuffle at sortByKey()

  2. Submits a job with two stages with two being executed

  3. Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed

dagscheduler webui skipped stages.png
Figure 1. Skipped Stages are already-computed ShuffleMapStages

results matching ""

    No results matching ""